package com.demo.component.mq;

import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.model.Message;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消息处理线程基类，子类需覆盖2个方法<P/>
 * 1. {@linkplain #BaseLoopQueueThread(CloudQueue) 构造器并注入queue} <P/>
 * 2. {@linkplain #deal(String) 消息处理逻辑} <P/>
 * Created by David Wang on 2016/2/29.
 */
public abstract class BaseLoopQueueThread implements Runnable {
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private static final Integer SLEEP_TIME = 1000;
    private CloudQueue queue;

    public BaseLoopQueueThread(CloudQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (MQSInitializing.isLoopFlag()) {
            try {
                Message msg = queue.popMessage();
                if (msg == null) { // 消息为空
                    Thread.sleep(SLEEP_TIME);
                    continue;
                }

                String receiptHandle = msg.getReceiptHandle();
                // 消息被消费次数
                int count = msg.getDequeueCount();
                if (count > 1) { // 次数大于1说明已消费过，则删除消息
                    queue.deleteMessage(receiptHandle);
                    continue;
                }
                // 消息内容
                String msgStr = msg.getMessageBodyAsString();
                if (StringUtils.isEmpty(msgStr)) {
                    queue.deleteMessage(receiptHandle);
                    continue;
                }
                if (deal(msgStr)) {
                    queue.deleteMessage(receiptHandle);
                }
            } catch (Exception e) {
                logger.error(e.toString(), e);
                continue;
            }
        }
    }

    public abstract boolean deal(String msgStr);
}
